package org.budo.warehouse.logic.consumer.jdbc;

import java.util.ArrayList;
import java.util.List;

import org.budo.support.lang.util.StringUtil;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.consumer.AbstractDataConsumer;
import org.budo.warehouse.logic.util.DataEntryUtil;
import org.budo.warehouse.logic.util.PipelineUtil;

import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
public abstract class AbstractJdbcDataConsumer extends AbstractDataConsumer {
    protected List<SqlUnit> buildSql(DataEntry dataEntry) {
        if ("INSERT".equalsIgnoreCase(dataEntry.getEventType())) {
            return this.insertSql(dataEntry);
        }

        if ("DELETE".equalsIgnoreCase(dataEntry.getEventType())) {
            return this.deleteSql(dataEntry);
        }

        if ("UPDATE".equalsIgnoreCase(dataEntry.getEventType())) {
            return this.updateSql(dataEntry);
        }

        log.info("#34 dataEntry=" + dataEntry + ", pipeline=" + this.getPipeline() + ", dataNode=" + this.getDataNode());
        return null;
    }

    protected List<SqlUnit> insertSql(DataEntry dataEntry) {
        List<SqlUnit> sqlUnits = new ArrayList<SqlUnit>();

        Integer rowCount = dataEntry.getRowCount();
        for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
            SqlUnit insertRow = this.insertRow(dataEntry, rowIndex);
            sqlUnits.add(insertRow);
        }

        return sqlUnits;
    }

    protected List<SqlUnit> deleteSql(DataEntry dataEntry) {
        List<SqlUnit> sqlUnits = new ArrayList<SqlUnit>();

        Integer rowCount = dataEntry.getRowCount();
        for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
            SqlUnit deleteRow = this.deleteRow(dataEntry, rowIndex);
            sqlUnits.add(deleteRow);
        }

        return sqlUnits;
    }

    protected List<SqlUnit> updateSql(DataEntry dataEntry) {
        List<SqlUnit> sqlUnits = new ArrayList<SqlUnit>();

        Integer rowCount = dataEntry.getRowCount();
        for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
            SqlUnit updateRow = this.updateRow(dataEntry, rowIndex);
            sqlUnits.add(updateRow);
        }

        return sqlUnits;
    }

    /**
     * 拼装 insert 语句
     */
    protected SqlUnit insertRow(DataEntry dataEntry, int rowIndex) {
        List<Object> parameters = new ArrayList<Object>();

        List<String> fields = new ArrayList<String>();
        List<String> values = new ArrayList<String>();

        Integer columnCount = dataEntry.getColumnCount(rowIndex);
        for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
            String columnName = dataEntry.getColumnName(rowIndex, columnIndex);
            fields.add("`" + columnName + "`");

            String columnValueAfter = dataEntry.getColumnValueAfter(rowIndex, columnIndex);
            if (null == columnValueAfter) {
                values.add("NULL");
            } else {
                values.add("?");
                parameters.add(columnValueAfter);
            }
        }

        String sql = "INSERT INTO " + PipelineUtil.targetTable(this.getPipeline(), dataEntry) + "(" + StringUtil.join(fields, ", ") + ") VALUES (" + StringUtil.join(values, ", ") + ") ";
        return new SqlUnit(sql, parameters.toArray());
    }

    protected SqlUnit deleteRow(DataEntry dataEntry, int rowIndex) {
        List<Object> parameters = new ArrayList<Object>();
        List<String> where = this.where(dataEntry, rowIndex, parameters);

        String sql = "DELETE FROM " + PipelineUtil.targetTable(this.getPipeline(), dataEntry) + " WHERE " + StringUtil.join(where, " AND ");
        return new SqlUnit(sql, parameters.toArray());
    }

    protected SqlUnit updateRow(DataEntry dataEntry, int rowIndex) {
        List<Object> parameters = new ArrayList<Object>();

        List<String> set = this.set(dataEntry, rowIndex, parameters);
        List<String> where = this.where(dataEntry, rowIndex, parameters);

        String sql = "UPDATE " + PipelineUtil.targetTable(this.getPipeline(), dataEntry) + " SET " + StringUtil.join(set, ", ") + " WHERE " + StringUtil.join(where, " AND ");
        return new SqlUnit(sql, parameters.toArray());
    }

    protected List<String> set(DataEntry dataEntry, int rowIndex, List<Object> parameters) {
        List<String> set = new ArrayList<String>();

        Integer columnCount = dataEntry.getColumnCount(rowIndex);
        for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
            String columnName = dataEntry.getColumnName(rowIndex, columnIndex);
            String columnValueAfter = dataEntry.getColumnValueAfter(rowIndex, columnIndex);
            if (null == columnValueAfter) {
                set.add("`" + columnName + "`=NULL");
            } else {
                set.add("`" + columnName + "`=?");
                parameters.add(columnValueAfter);
            }
        }
        return set;
    }

    protected List<String> where(DataEntry dataEntry, int rowIndex, List<Object> parameters) {
        List<String> where = new ArrayList<String>();
        boolean hasIsKeyColumn = DataEntryUtil.hasIsKeyColumn(dataEntry, rowIndex);
        Integer columnCount = dataEntry.getColumnCount(rowIndex);

        for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
            if (hasIsKeyColumn && !dataEntry.getColumnIsKey(rowIndex, columnIndex)) { // 如果有主键就主键作为条件,否则全部作为条件
                continue;
            }

            String columnName = dataEntry.getColumnName(rowIndex, columnIndex);
            String columnValueBefore = dataEntry.getColumnValueBefore(rowIndex, columnIndex);
            if (null == columnValueBefore) {
                where.add("`" + columnName + "` IS NULL");
            } else {
                where.add("`" + columnName + "`=?");
                parameters.add(columnValueBefore);
            }
        }

        return where;
    }
}